#include "config.h"

#include <sys/types.h>
#include <sys/stat.h>
#include <sys/poll.h>
#include <sys/epoll.h>
#include <sys/file.h>
#include <sys/mman.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdint.h>
#include <sys/types.h>
#include <libgen.h>
#include <ctype.h>
#include <fcntl.h>
#include <libaio.h>
#include <errno.h>

#define DBG_SUBSYS S_LIBREPLICA

#include "cache.h"
#include "disk.h"
#include "sysy_lib.h"
#include "ynet_rpc.h"
#include "job_dock.h"
#include "net_global.h"
#include "disk_aio.h"
#include "bh.h"
#include "tpool.h"
#include "disk.h"
#include "schedule.h"
#include "configure.h"
#include "dbg.h"

typedef struct {
        sem_t sem;
        io_context_t  ctx;
        sy_spinlock_t lock;
        struct list_head list;
} disk_mt_t;

typedef struct {
        struct list_head hook;
        char op;
        int fd;
        struct iovec *iov;
        int iov_count;
        off_t offset;
        void *task;
} iocb_mt_t;


#define __IOCB_MAX__ 4096
#define MAX_SUBMIT  __IOCB_MAX__
static disk_mt_t  *disk_mt;

static int __disk_mt_submit()
{
        int ret;
        iocb_mt_t *iocb;

        ret = sy_spin_lock(&disk_mt->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        YASSERT(!list_empty(&disk_mt->list));
        iocb = (void *)disk_mt->list.next;
        list_del(&iocb->hook);

        sy_spin_unlock(&disk_mt->lock);

        ANALYSIS_BEGIN(0);

        if (iocb->op == 'w') {
                ret = pwritev(iocb->fd, iocb->iov, iocb->iov_count, iocb->offset);
                if (ret < 0) {
                        ret = errno;
                        GOTO(err_ret, ret);
                }
        } else {
                YASSERT(iocb->op == 'r');
                ret = preadv(iocb->fd, iocb->iov, iocb->iov_count, iocb->offset);
                if (ret < 0) {
                        ret = errno;
                        GOTO(err_ret, ret);
                }
        }

        schedule_resume((task_t *)iocb->task, 0, NULL);

        ANALYSIS_END(0, IO_WARN, NULL);

        return 0;
err_ret:
        UNIMPLEMENTED(__DUMP__);
        return ret;
}

static void *__submit_mt_worker(void *_args)
{
        int ret;

        (void) _args;

        while (1) {
                ret = _sem_wait(&disk_mt->sem);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = __disk_mt_submit();
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return NULL;
err_ret:
        UNIMPLEMENTED(__DUMP__);
        return NULL;
}

static int __disk_mt_queue(iocb_mt_t *iocb)
{
        int ret;

        ret = sy_spin_lock(&disk_mt->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        list_add_tail(&iocb->hook, &disk_mt->list);

        sy_spin_unlock(&disk_mt->lock);

        sem_post(&disk_mt->sem);

        return 0;
err_ret:
        return ret;
}

static int __disk_mt(const diskloc_t *loc, buffer_t *buf, int _offset, char op)
{
        int ret, fd, iov_count;
        uint64_t offset;
        //struct iocb iocb;
        iocb_mt_t iocb;
        struct iovec iov[LICH_IOV_MAX];
        task_t task = schedule_task_get();

        ret = disk_get_cachefd(loc, &fd, &offset);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        iov_count = LICH_IOV_MAX;
        ret = mbuffer_trans(iov, &iov_count, buf);
        YASSERT(ret == (int)buf->len);

        iocb.op = op;
        iocb.iov = iov;
        iocb.fd = fd;
        iocb.iov_count = iov_count;
        iocb.offset = offset + _offset;
        iocb.task = &task;

        ret = __disk_mt_queue(&iocb);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DBUG("aio yield\n");
        ret = schedule_yield("disk_mt_read", NULL, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        DBUG("aio resume\n");

        return 0;
err_ret:
        return ret;
}

int disk_mt_read(const diskloc_t *loc, buffer_t *buf, int _offset)
{
        DBUG("mt read\n");
        return __disk_mt(loc, buf, _offset, 'r');
}

int disk_mt_write(const diskloc_t *loc, const buffer_t *buf, int _offset)
{
        DBUG("mt write\n");
        return __disk_mt(loc, (void *)buf, _offset, 'w');
}

int disk_mt_init()
{
        int ret, size, i;
        pthread_t th;
        pthread_attr_t ta;

        size = sizeof(disk_mt_t);
        ret = ymalloc((void **)&disk_mt, size);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(disk_mt, 0x0, size);

        INIT_LIST_HEAD(&disk_mt->list);

        ret = io_setup(MAX_SUBMIT, &disk_mt->ctx);
        if (ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        }

        ret = sy_spin_init(&disk_mt->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = sem_init(&disk_mt->sem, 0, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        for (i = 0; i < 128; i++) {
                (void) pthread_attr_init(&ta);
                (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

                ret = pthread_create(&th, &ta, __submit_mt_worker, NULL);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}
